Skip to main content

gRPC with Python

The Surprising Part: Your REST API Is Slow Between Services

# REST call between internal services (what most teams do)
import httpx
import json

async def classify_document_rest(text: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.post(
"http://classification-service/classify",
json={"text": text}, # JSON serialisation: ~0.5ms
headers={"Content-Type": "application/json"},
)
return response.json() # JSON deserialisation: ~0.3ms

# What this costs per call:
# - TCP handshake (if no keep-alive): 1-3 RTT
# - TLS handshake: 1-2 RTT
# - HTTP/1.1 header overhead: 200-800 bytes of ASCII headers
# - JSON serialisation + deserialisation: ~0.8ms for 1KB payload
# - HTTP/1.1 head-of-line blocking: can't pipeline without HTTP/2

# At 10,000 classification calls per minute, JSON overhead alone = 8 seconds of CPU time

# gRPC call - same operation
import grpc
from classification_pb2_grpc import ClassifierStub
from classification_pb2 import ClassifyRequest

async def classify_document_grpc(text: str, stub: ClassifierStub) -> dict:
request = ClassifyRequest(text=text)
response = await stub.Classify(request) # Protobuf: ~0.05ms to serialise
return {"label": response.label, "confidence": response.confidence}

# What this costs per call:
# - Reuses existing HTTP/2 multiplexed connection (no handshake)
# - Protobuf binary: 10-100x smaller than equivalent JSON for structured data
# - HTTP/2 multiplexing: no head-of-line blocking
# - Generated code: no runtime field name parsing
# - At 10,000 calls per minute: ~500ms CPU time for serialisation

The difference is not always significant. For a service called 10 times per minute, REST is fine. For a service called 10,000 times per minute by multiple callers, gRPC is worth the setup cost. This lesson will show you exactly when, and how.

What You Will Learn

  • Why gRPC - REST vs gRPC comparison, when each wins
  • Protocol Buffers - .proto file anatomy, types, generating Python code
  • Unary RPC - complete client and server implementation
  • Server-side streaming - streaming batch results back
  • Client-side streaming - streaming uploads
  • Bidirectional streaming - real-time processing
  • Interceptors - logging, auth, retry at the gRPC layer
  • Error Handling - StatusCode, structured errors, HTTP mapping
  • Decision Guide - concrete criteria for gRPC vs REST

Prerequisites: Python async/await, FastAPI basics (Lesson 01), Docker.

Part 1: REST vs gRPC - An Honest Comparison

The Comparison Table

FeatureREST / JSONgRPC / Protobuf
ProtocolHTTP/1.1 or HTTP/2HTTP/2 (required)
Payload formatJSON (text)Protocol Buffers (binary)
Payload sizeLarger (field names repeated every message)3–10x smaller
Serialisation speed~1 ms for 1 KB~0.05 ms for equivalent
Schema enforcementOptional (OpenAPI, manual)Required (.proto files)
Code generationOptional (openapi-generator)Required (protoc)
Browser supportNativeRequires grpc-web proxy
StreamingSSE, WebSocket (separate protocols)Native (4 patterns)
Load balancingL7 (any reverse proxy)L7 with HTTP/2 awareness required
Debuggingcurl, browser, Postmangrpcurl, grpc-client-cli
Learning curveLowMedium-high
When to choosePublic APIs, browser clients, simple internalHigh-frequency internal, streaming, polyglot

The Honest Benchmark Thought Experiment

Consider classifying 1 million documents per day:

REST / JSON:
- 1,000,000 calls × 0.8ms serialisation = 800 seconds of CPU
- Payload: 1 KB JSON per request = 1 GB of data transferred (internal network)
- Connection overhead: if not reusing keep-alive, adds 1-3ms per call

gRPC / Protobuf:
- 1,000,000 calls × 0.05ms serialisation = 50 seconds of CPU (16x faster)
- Payload: ~100 bytes protobuf per request = 100 MB of data transferred
- Connection: HTTP/2 multiplexes all calls over persistent connections

For this workload, gRPC saves ~750 CPU-seconds per day and 900 MB of internal network traffic. For a smaller workload (1,000 calls/day), the difference is negligible.

Part 2: Protocol Buffers

The .proto File

// protos/classification.proto
syntax = "proto3";

package classification;

// Go package option (used when generating Go code from this proto)
option go_package = "github.com/example/doc-intelligence/classification";
// Python package option (used when generating Python code)
option python_package = "classification_pb2";

// ─── Scalar Types ───────────────────────────────────────────────────
// double, float, int32, int64, uint32, uint64, sint32, sint64,
// fixed32, fixed64, sfixed32, sfixed64, bool, string, bytes

// ─── Enum ──────────────────────────────────────────────────────────
enum DocumentType {
DOCUMENT_TYPE_UNSPECIFIED = 0; // Always have a default 0 value
DOCUMENT_TYPE_INVOICE = 1;
DOCUMENT_TYPE_CONTRACT = 2;
DOCUMENT_TYPE_REPORT = 3;
DOCUMENT_TYPE_RECEIPT = 4;
DOCUMENT_TYPE_LETTER = 5;
}

// ─── Messages ──────────────────────────────────────────────────────

// Request message - what the client sends
message ClassifyRequest {
string text = 1; // Field number 1 - unique, never reuse
string document_id = 2; // For logging and idempotency
string model_version = 3; // Optionally request a specific model
int32 max_results = 4; // How many labels to return (top-N)
}

// Response message - what the server returns
message ClassifyResponse {
string document_id = 1;
repeated ClassificationResult results = 2; // 'repeated' = list/array
string model_version = 3;
int64 processing_time_ms = 4;
}

// Nested message
message ClassificationResult {
DocumentType document_type = 1;
float confidence = 2; // 0.0 to 1.0
repeated string keywords = 3; // repeated nested in repeated is fine
}

// Batch request - uses 'repeated' for input too
message ClassifyBatchRequest {
repeated ClassifyRequest requests = 1;
bool include_keywords = 2;
}

// For streaming - streamed one at a time
message ClassifyBatchResponse {
ClassifyResponse result = 1;
int32 sequence_number = 2; // Position in batch
bool is_last = 3;
}

// For upload streaming
message DocumentChunk {
bytes data = 1; // Chunk of binary data
string document_id = 2; // Stable across all chunks
int32 chunk_number = 3;
bool is_last = 4;
string mime_type = 5; // Only sent in first chunk
}

message UploadResponse {
string document_id = 1;
int64 total_bytes = 2;
int32 total_chunks = 3;
string storage_key = 4;
}

// oneof - exactly one of these fields can be set
message ClassificationEvent {
oneof event_type {
ClassifyRequest classification_started = 1;
ClassifyResponse classification_complete = 2;
ErrorEvent classification_failed = 3;
}
string correlation_id = 4;
}

// map<key_type, value_type> - dictionary
message ModelMetadata {
string model_name = 1;
string version = 2;
map<string, string> labels = 3; // label_id -> label_name
map<string, float> thresholds = 4; // label_id -> confidence threshold
}

message ErrorEvent {
string code = 1;
string message = 2;
map<string, string> metadata = 3;
}

// ─── Service Definition ─────────────────────────────────────────────

service DocumentClassifier {
// Unary: client sends ONE request, server returns ONE response
rpc Classify(ClassifyRequest) returns (ClassifyResponse);

// Server-side streaming: client sends ONE request, server streams MANY responses
rpc ClassifyBatch(ClassifyBatchRequest) returns (stream ClassifyBatchResponse);

// Client-side streaming: client streams MANY requests, server returns ONE response
rpc UploadDocument(stream DocumentChunk) returns (UploadResponse);

// Bidirectional streaming: both sides stream simultaneously
rpc ClassifyStream(stream ClassificationEvent) returns (stream ClassificationEvent);
}

Generating Python Code

# Install dependencies
pip install grpcio grpcio-tools

# Generate Python code from the proto file
# This creates two files:
# classification_pb2.py - message classes
# classification_pb2_grpc.py - service stubs and servicers

python -m grpc_tools.protoc \
-I./protos \
--python_out=./classification_service \
--grpc_python_out=./classification_service \
./protos/classification.proto

# If you have multiple proto files that import each other:
python -m grpc_tools.protoc \
-I./protos \
--python_out=./generated \
--grpc_python_out=./generated \
./protos/classification.proto \
./protos/common.proto

# For better type hints (optional but recommended):
pip install grpc-stubs mypy-protobuf
python -m grpc_tools.protoc \
-I./protos \
--python_out=./generated \
--grpc_python_out=./generated \
--mypy_out=./generated \
./protos/classification.proto

Understanding the Generated Python Classes

# What protoc generates - simplified view
# classification_pb2.py (generated, do not edit)

# Message classes are generated as Python classes
# Field access uses attribute syntax
request = ClassifyRequest(
text="This invoice is due on January 15th...",
document_id="doc-001",
max_results=3,
)
print(request.text) # "This invoice is due on January 15th..."
print(request.document_id) # "doc-001"
print(request.max_results) # 3

# Serialisation
serialised = request.SerializeToString() # bytes
print(len(serialised)) # Much smaller than JSON equivalent

# Deserialisation
recovered = ClassifyRequest()
recovered.ParseFromString(serialised)

# Working with repeated fields (lists)
response = ClassifyResponse()
response.results.append(
ClassificationResult(
document_type=DocumentType.DOCUMENT_TYPE_INVOICE,
confidence=0.97,
)
)
response.results[0].keywords.extend(["invoice", "payment", "due date"])

# Working with oneof
event = ClassificationEvent(
correlation_id="corr-xyz",
classification_started=ClassifyRequest(text="test"),
)
# Check which field is set
print(event.WhichOneof("event_type")) # "classification_started"

# Working with map
metadata = ModelMetadata()
metadata.labels["INV"] = "Invoice"
metadata.labels["CON"] = "Contract"
metadata.thresholds["INV"] = 0.85
print(dict(metadata.labels)) # {"INV": "Invoice", "CON": "Contract"}

Part 3: Unary RPC - Complete Server and Client

The gRPC Server

# classification_service/server.py
import asyncio
import logging
import grpc
from grpc import aio

import classification_pb2 as pb2
import classification_pb2_grpc as pb2_grpc
from classification_service.model import DocumentClassifierModel
from classification_service.interceptors import LoggingInterceptor, AuthInterceptor

logger = logging.getLogger("classification_service.grpc")

class DocumentClassifierServicer(pb2_grpc.DocumentClassifierServicer):
"""
Implements the DocumentClassifier gRPC service.
Inherits from the generated servicer base class.
"""

def __init__(self, model: DocumentClassifierModel):
self._model = model

async def Classify(
self,
request: pb2.ClassifyRequest,
context: grpc.aio.ServicerContext,
) -> pb2.ClassifyResponse:
"""
Unary RPC - one request, one response.
'context' gives access to request metadata, deadline, peer info.
"""
import time
start = time.perf_counter()

# Access request metadata (headers in gRPC)
metadata = dict(context.invocation_metadata())
correlation_id = metadata.get("x-correlation-id", "unknown")

logger.info(
"Classify RPC called",
extra={
"document_id": request.document_id,
"text_length": len(request.text),
"correlation_id": correlation_id,
"peer": context.peer(), # Client address
},
)

# Validate request
if not request.text.strip():
await context.abort(
grpc.StatusCode.INVALID_ARGUMENT,
"text field is required and cannot be empty",
)
return pb2.ClassifyResponse() # Never reached, but satisfies type checker

if len(request.text) > 100_000:
await context.abort(
grpc.StatusCode.INVALID_ARGUMENT,
f"text exceeds maximum length of 100,000 characters",
)

# Check if the client has already cancelled (important for long operations)
if context.cancelled():
logger.info(f"Client cancelled request for doc {request.document_id}")
return pb2.ClassifyResponse()

try:
# Run ML inference - synchronous, so run in thread pool
loop = asyncio.get_event_loop()
results = await loop.run_in_executor(
None, # Use default thread pool executor
self._model.classify,
request.text,
request.max_results or 3,
)
except Exception as exc:
logger.error(f"Model inference failed: {exc}", exc_info=True)
await context.abort(
grpc.StatusCode.INTERNAL,
"Classification model error",
)
return pb2.ClassifyResponse()

elapsed_ms = int((time.perf_counter() - start) * 1000)

# Build the response protobuf message
response = pb2.ClassifyResponse(
document_id=request.document_id,
model_version=self._model.version,
processing_time_ms=elapsed_ms,
)
for label, confidence, keywords in results:
result = pb2.ClassificationResult(
document_type=label,
confidence=confidence,
)
result.keywords.extend(keywords)
response.results.append(result)

return response

async def serve():
"""Start the gRPC server."""
# Load model once at startup
model = DocumentClassifierModel.load("models/classifier_v3.pt")

# Create server with interceptors
server = aio.server(
interceptors=[
AuthInterceptor(),
LoggingInterceptor(),
],
options=[
# Maximum message size: 50 MB (default is 4 MB)
("grpc.max_receive_message_length", 50 * 1024 * 1024),
("grpc.max_send_message_length", 50 * 1024 * 1024),
# Keep-alive: ping clients every 30 seconds
("grpc.keepalive_time_ms", 30000),
("grpc.keepalive_timeout_ms", 5000),
],
)

pb2_grpc.add_DocumentClassifierServicer_to_server(
DocumentClassifierServicer(model), server
)

# Reflection allows grpcurl and other tools to discover your service
from grpc_reflection.v1alpha import reflection
reflection.enable_server_reflection(
[pb2.DESCRIPTOR.services_by_name["DocumentClassifier"].full_name,
reflection.SERVICE_NAME],
server,
)

listen_addr = "0.0.0.0:50051"
server.add_insecure_port(listen_addr)

logger.info(f"Classification gRPC server starting on {listen_addr}")
await server.start()

try:
await server.wait_for_termination()
except asyncio.CancelledError:
# Graceful shutdown - wait up to 5 seconds for in-flight RPCs
await server.stop(grace=5)
logger.info("gRPC server stopped")

if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(serve())

The gRPC Client

# upload_service/clients/classifier.py
import grpc
from grpc import aio
import logging
from typing import Optional

import classification_pb2 as pb2
import classification_pb2_grpc as pb2_grpc
from upload_service.exceptions import ClassificationServiceUnavailableError

logger = logging.getLogger("upload_service.classifier_client")

class ClassifierGRPCClient:
"""
Async gRPC client for the Classification Service.
Manages a persistent channel with connection pooling.
"""

def __init__(self, host: str, port: int = 50051):
self._host = host
self._port = port
self._channel: Optional[aio.Channel] = None
self._stub: Optional[pb2_grpc.DocumentClassifierStub] = None

async def connect(self) -> None:
"""Open the gRPC channel. Called at service startup."""
self._channel = aio.insecure_channel(
f"{self._host}:{self._port}",
options=[
("grpc.max_receive_message_length", 50 * 1024 * 1024),
# Reconnect automatically if connection drops
("grpc.enable_retries", 1),
# Service config for automatic retries
("grpc.service_config", """{
"retryPolicy": {
"maxAttempts": 3,
"initialBackoff": "0.5s",
"maxBackoff": "5s",
"backoffMultiplier": 2,
"retryableStatusCodes": ["UNAVAILABLE", "DEADLINE_EXCEEDED"]
}
}"""),
],
)
self._stub = pb2_grpc.DocumentClassifierStub(self._channel)
logger.info(f"Connected to Classification Service at {self._host}:{self._port}")

async def close(self) -> None:
"""Close the channel. Called at service shutdown."""
if self._channel:
await self._channel.close()

async def health_check(self) -> bool:
"""Check if the Classification Service is reachable."""
from grpc_health.v1 import health_pb2, health_pb2_grpc
health_stub = health_pb2_grpc.HealthStub(self._channel)
try:
response = await health_stub.Check(
health_pb2.HealthCheckRequest(service="DocumentClassifier"),
timeout=2.0,
)
return response.status == health_pb2.HealthCheckResponse.SERVING
except grpc.RpcError:
return False

async def classify(
self,
text: str,
document_id: str,
correlation_id: str = "",
timeout_seconds: float = 30.0,
) -> dict:
"""Unary RPC call with timeout and error mapping."""
if not self._stub:
raise RuntimeError("Client not connected. Call connect() first.")

request = pb2.ClassifyRequest(
text=text,
document_id=document_id,
max_results=3,
)

# Send correlation ID as gRPC metadata (equivalent to HTTP headers)
metadata = []
if correlation_id:
metadata.append(("x-correlation-id", correlation_id))

try:
response: pb2.ClassifyResponse = await self._stub.Classify(
request,
timeout=timeout_seconds,
metadata=metadata,
)
return {
"document_id": response.document_id,
"model_version": response.model_version,
"processing_time_ms": response.processing_time_ms,
"results": [
{
"label": pb2.DocumentType.Name(r.document_type),
"confidence": r.confidence,
"keywords": list(r.keywords),
}
for r in response.results
],
}
except grpc.RpcError as exc:
self._handle_grpc_error(exc)

def _handle_grpc_error(self, exc: grpc.RpcError) -> None:
"""Map gRPC errors to domain exceptions."""
code = exc.code()
details = exc.details()

logger.error(
f"gRPC error: {code.name} - {details}",
extra={"grpc_code": code.name, "grpc_details": details},
)

if code == grpc.StatusCode.UNAVAILABLE:
raise ClassificationServiceUnavailableError(
f"Classification Service is unreachable: {details}"
)
elif code == grpc.StatusCode.DEADLINE_EXCEEDED:
raise ClassificationServiceUnavailableError(
"Classification Service timed out"
)
elif code == grpc.StatusCode.INVALID_ARGUMENT:
raise ValueError(f"Invalid classification request: {details}")
elif code == grpc.StatusCode.INTERNAL:
raise ClassificationServiceUnavailableError(
f"Classification Service internal error: {details}"
)
else:
raise ClassificationServiceUnavailableError(
f"Unexpected gRPC error {code.name}: {details}"
)

Part 4: Server-Side Streaming

Server-side streaming is ideal for returning large results progressively.

# classification_service/server.py - add to DocumentClassifierServicer

async def ClassifyBatch(
self,
request: pb2.ClassifyBatchRequest,
context: grpc.aio.ServicerContext,
) -> None:
"""
Server-side streaming RPC.
Client sends ONE request (a batch).
Server streams MANY responses (one per document).
The method signature returns None - you write to the context instead.
"""
logger.info(f"ClassifyBatch called with {len(request.requests)} documents")

for i, classify_request in enumerate(request.requests):
# Check if client cancelled between items
if context.cancelled():
logger.info("Client cancelled batch classification")
return

try:
results = await asyncio.get_event_loop().run_in_executor(
None, self._model.classify, classify_request.text, 3
)
response = pb2.ClassifyResponse(
document_id=classify_request.document_id,
model_version=self._model.version,
)
for label, confidence, keywords in results:
result = pb2.ClassificationResult(
document_type=label, confidence=confidence
)
result.keywords.extend(keywords)
response.results.append(result)

# Stream this result back immediately - don't wait for all
await context.write(
pb2.ClassifyBatchResponse(
result=response,
sequence_number=i,
is_last=(i == len(request.requests) - 1),
)
)
except Exception as exc:
logger.error(f"Failed to classify document {classify_request.document_id}: {exc}")
# Continue with other documents rather than failing the whole batch

# Client consuming the stream
async def classify_batch(self, texts_and_ids: list[tuple[str, str]]) -> list[dict]:
"""Client-side iteration over a server-streaming RPC."""
batch_request = pb2.ClassifyBatchRequest(
requests=[
pb2.ClassifyRequest(text=text, document_id=doc_id)
for text, doc_id in texts_and_ids
],
include_keywords=True,
)

results = []
# aio stub returns an async generator - iterate with async for
async for batch_response in self._stub.ClassifyBatch(batch_request, timeout=120.0):
results.append({
"sequence": batch_response.sequence_number,
"document_id": batch_response.result.document_id,
"label": batch_response.result.results[0].document_type if batch_response.result.results else None,
})
if batch_response.is_last:
break

return results

Part 5: Client-Side Streaming

Client-side streaming is ideal for large file uploads broken into chunks.

# classification_service/server.py - add to DocumentClassifierServicer

async def UploadDocument(
self,
request_iterator: grpc.aio.ServicerContext,
context: grpc.aio.ServicerContext,
) -> pb2.UploadResponse:
"""
Client-side streaming RPC.
Client streams MANY requests (file chunks).
Server returns ONE response (upload complete).
"""
chunks = []
document_id = None
mime_type = None
total_bytes = 0

# Iterate over the client's stream of chunks
async for chunk in context: # context is iterable for client-streaming
if document_id is None:
document_id = chunk.document_id
mime_type = chunk.mime_type

chunks.append(chunk.data)
total_bytes += len(chunk.data)

logger.debug(
f"Received chunk {chunk.chunk_number} for doc {chunk.document_id} "
f"({len(chunk.data)} bytes)"
)

if total_bytes > 50 * 1024 * 1024: # 50 MB limit
await context.abort(
grpc.StatusCode.INVALID_ARGUMENT,
"Document exceeds maximum size of 50 MB",
)
return pb2.UploadResponse()

# Reassemble the file
file_bytes = b"".join(chunks)

# Store it
storage_key = await self._storage.put(
f"raw/{document_id}", file_bytes, content_type=mime_type
)

return pb2.UploadResponse(
document_id=document_id,
total_bytes=total_bytes,
total_chunks=len(chunks),
storage_key=storage_key,
)

# Upload client - streaming file chunks
async def upload_document(self, file_bytes: bytes, document_id: str, mime_type: str) -> dict:
"""Stream a file to the server in chunks."""
CHUNK_SIZE = 256 * 1024 # 256 KB chunks

async def chunk_generator():
total_chunks = (len(file_bytes) + CHUNK_SIZE - 1) // CHUNK_SIZE
for i, offset in enumerate(range(0, len(file_bytes), CHUNK_SIZE)):
chunk_data = file_bytes[offset:offset + CHUNK_SIZE]
yield pb2.DocumentChunk(
data=chunk_data,
document_id=document_id,
chunk_number=i,
is_last=(i == total_chunks - 1),
mime_type=mime_type if i == 0 else "", # Only send mime_type once
)

response = await self._stub.UploadDocument(chunk_generator(), timeout=60.0)
return {
"document_id": response.document_id,
"total_bytes": response.total_bytes,
"total_chunks": response.total_chunks,
"storage_key": response.storage_key,
}

Part 6: Bidirectional Streaming

Bidirectional streaming enables real-time, interactive protocols.

# Real-time transcription: client streams audio chunks, server streams text results

# classification_service/server.py
async def ClassifyStream(
self,
request_iterator,
context: grpc.aio.ServicerContext,
):
"""
Bidirectional streaming RPC.
Both client and server stream simultaneously.
Use for: real-time audio transcription, interactive NLP, game state sync.
"""
correlation_id = None

async for event in request_iterator:
if context.cancelled():
return

# The event has a oneof field - handle each case
event_type = event.WhichOneof("event_type")
correlation_id = event.correlation_id

if event_type == "classification_started":
classify_request = event.classification_started
logger.info(
f"Stream: classifying doc {classify_request.document_id}",
extra={"correlation_id": correlation_id},
)

try:
results = await asyncio.get_event_loop().run_in_executor(
None, self._model.classify, classify_request.text, 3
)

response_msg = pb2.ClassifyResponse(
document_id=classify_request.document_id,
model_version=self._model.version,
)
for label, confidence, _ in results:
response_msg.results.append(
pb2.ClassificationResult(
document_type=label, confidence=confidence
)
)

# Stream result back immediately
await context.write(
pb2.ClassificationEvent(
correlation_id=correlation_id,
classification_complete=response_msg,
)
)
except Exception as exc:
await context.write(
pb2.ClassificationEvent(
correlation_id=correlation_id,
classification_failed=pb2.ErrorEvent(
code="INFERENCE_ERROR",
message=str(exc),
),
)
)

# Client: send and receive simultaneously
async def classify_stream(self, documents: list[dict]) -> list[dict]:
results = []
call = self._stub.ClassifyStream()

async def send_requests():
for doc in documents:
await call.write(pb2.ClassificationEvent(
correlation_id=doc["correlation_id"],
classification_started=pb2.ClassifyRequest(
text=doc["text"],
document_id=doc["id"],
),
))
await call.done_writing()

# Send and receive concurrently
send_task = asyncio.create_task(send_requests())

async for response_event in call:
event_type = response_event.WhichOneof("event_type")
if event_type == "classification_complete":
results.append({
"id": response_event.classification_complete.document_id,
"label": response_event.classification_complete.results[0].document_type
if response_event.classification_complete.results else None,
})
elif event_type == "classification_failed":
results.append({
"id": None,
"error": response_event.classification_failed.message,
})

await send_task
return results

Part 7: Interceptors

Interceptors are gRPC's middleware - they run around every RPC call.

Server-Side Logging Interceptor

# classification_service/interceptors.py
import grpc
from grpc import aio
import logging
import time

logger = logging.getLogger("classification_service.interceptors")

class LoggingInterceptor(aio.ServerInterceptor):
"""Logs all incoming RPCs with timing and metadata."""

async def intercept_service(
self,
continuation,
handler_call_details: grpc.HandlerCallDetails,
):
method = handler_call_details.method
start = time.perf_counter()

handler = await continuation(handler_call_details)

async def intercepted_handler(request_or_iterator, servicer_context):
metadata = dict(servicer_context.invocation_metadata())
correlation_id = metadata.get("x-correlation-id", "none")

logger.info(
f"RPC started: {method}",
extra={"method": method, "correlation_id": correlation_id},
)

try:
response = await handler(request_or_iterator, servicer_context)
elapsed = (time.perf_counter() - start) * 1000
logger.info(
f"RPC complete: {method} ({elapsed:.1f}ms)",
extra={"method": method, "duration_ms": elapsed, "correlation_id": correlation_id},
)
return response
except Exception as exc:
elapsed = (time.perf_counter() - start) * 1000
logger.error(
f"RPC failed: {method} ({elapsed:.1f}ms): {exc}",
extra={"method": method, "duration_ms": elapsed, "error": str(exc)},
)
raise

return grpc.unary_unary_rpc_method_handler(intercepted_handler)

Server-Side Auth Interceptor

class AuthInterceptor(aio.ServerInterceptor):
"""Validates JWT tokens in gRPC metadata."""

UNAUTHENTICATED_METHODS = {"/grpc.health.v1.Health/Check"}

async def intercept_service(self, continuation, handler_call_details):
method = handler_call_details.method

if method in self.UNAUTHENTICATED_METHODS:
return await continuation(handler_call_details)

handler = await continuation(handler_call_details)

async def auth_handler(request_or_iterator, servicer_context):
metadata = dict(servicer_context.invocation_metadata())
token = metadata.get("authorization", "")

if not token.startswith("Bearer "):
await servicer_context.abort(
grpc.StatusCode.UNAUTHENTICATED,
"Missing or invalid Authorization metadata",
)
return None

jwt_token = token[len("Bearer "):]
try:
user = await verify_jwt(jwt_token)
# Make user available to the handler
servicer_context.user = user
except InvalidTokenError:
await servicer_context.abort(
grpc.StatusCode.UNAUTHENTICATED,
"Invalid or expired token",
)
return None

return await handler(request_or_iterator, servicer_context)

return grpc.unary_unary_rpc_method_handler(auth_handler)

Client-Side Retry Interceptor

# upload_service/clients/interceptors.py
class RetryInterceptor(grpc.aio.UnaryUnaryClientInterceptor):
"""Client-side retry for transient failures."""

RETRIABLE_CODES = {grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.DEADLINE_EXCEEDED}

def __init__(self, max_retries: int = 3, initial_backoff: float = 0.5):
self._max_retries = max_retries
self._initial_backoff = initial_backoff

async def intercept_unary_unary(self, continuation, client_call_details, request):
last_error = None
backoff = self._initial_backoff

for attempt in range(self._max_retries + 1):
try:
return await continuation(client_call_details, request)
except grpc.RpcError as exc:
if exc.code() not in self.RETRIABLE_CODES or attempt == self._max_retries:
raise
last_error = exc
import asyncio, random
jitter = random.uniform(0, backoff * 0.1)
logger.warning(
f"Retrying gRPC call (attempt {attempt + 1}/{self._max_retries}), "
f"sleeping {backoff + jitter:.2f}s"
)
await asyncio.sleep(backoff + jitter)
backoff = min(backoff * 2, 30.0)

raise last_error

Part 8: Error Handling

gRPC Status Codes

StatusCodeHTTP EquivalentWhen to Use
OK200Success
CANCELLED499Client cancelled the request
UNKNOWN500Unknown error (fallback)
INVALID_ARGUMENT400Bad request data
DEADLINE_EXCEEDED504Operation timed out
NOT_FOUND404Resource not found
ALREADY_EXISTS409Duplicate resource
PERMISSION_DENIED403Authenticated but not authorised
UNAUTHENTICATED401Not authenticated
RESOURCE_EXHAUSTED429Rate limit, quota exceeded
FAILED_PRECONDITION422System not in required state
INTERNAL500Internal server error
UNAVAILABLE503Service temporarily unavailable
DATA_LOSS500Unrecoverable data loss

Structured Error Details

# For rich error information, use google.rpc.Status with details
from google.rpc import status_pb2, error_details_pb2
from grpc_status import rpc_status

async def Classify(self, request, context):
if not request.text:
# Rich error: BadRequest with field violations
detail = error_details_pb2.BadRequest()
field_violation = detail.field_violations.add()
field_violation.field = "text"
field_violation.description = "text is required and cannot be empty"

await context.abort_with_status(
rpc_status.to_status(
status_pb2.Status(
code=grpc.StatusCode.INVALID_ARGUMENT.value[0],
message="Validation failed",
details=[detail.Pack(field_violation)], # type: Any
)
)
)

# Client extracting the rich error:
try:
response = await stub.Classify(request)
except grpc.RpcError as exc:
status = rpc_status.from_call(exc)
if status:
for detail in status.details:
if detail.Is(error_details_pb2.BadRequest.DESCRIPTOR):
bad_request = error_details_pb2.BadRequest()
detail.Unpack(bad_request)
for violation in bad_request.field_violations:
print(f"Field '{violation.field}': {violation.description}")

Mapping gRPC Errors to HTTP in Gateway Services

# When your FastAPI gateway calls a gRPC service and needs to map errors to HTTP:

GRPC_TO_HTTP_STATUS = {
grpc.StatusCode.OK: 200,
grpc.StatusCode.INVALID_ARGUMENT: 400,
grpc.StatusCode.NOT_FOUND: 404,
grpc.StatusCode.ALREADY_EXISTS: 409,
grpc.StatusCode.PERMISSION_DENIED: 403,
grpc.StatusCode.UNAUTHENTICATED: 401,
grpc.StatusCode.RESOURCE_EXHAUSTED: 429,
grpc.StatusCode.FAILED_PRECONDITION: 422,
grpc.StatusCode.INTERNAL: 500,
grpc.StatusCode.UNAVAILABLE: 503,
grpc.StatusCode.DEADLINE_EXCEEDED: 504,
grpc.StatusCode.UNKNOWN: 500,
}

# FastAPI exception handler for gRPC errors
async def grpc_error_handler(request: Request, exc: grpc.RpcError) -> JSONResponse:
http_status = GRPC_TO_HTTP_STATUS.get(exc.code(), 500)
return JSONResponse(
status_code=http_status,
content={
"error_code": exc.code().name,
"message": exc.details(),
"request_id": getattr(request.state, "request_id", "unknown"),
},
)

# Register in FastAPI app
app.add_exception_handler(grpc.RpcError, grpc_error_handler)

Part 9: gRPC vs REST Decision Guide

Use this matrix for every new service communication interface:

QuestionIf YES →If NO →
Will browser clients call this directly?RESTEither
Is this called > 5,000 times/minute?gRPCREST is fine
Does it need streaming (server-push, live updates)?gRPCEither
Are multiple teams in different languages consuming it?gRPC (schema enforced)REST
Is it a public-facing API (external developers)?RESTEither
Does the payload contain binary data (files, embeddings)?gRPCEither
Do you need bidirectional real-time communication?gRPCEither
Is operational simplicity the top priority?RESTEither
Is latency < 5 ms critical?gRPCEither

The practical rule:

  • Public APIs, browser calls, external developers → REST
  • Internal high-frequency service calls, ML serving, binary payloads, streaming → gRPC
  • Everything else → REST (simpler, more tooling, easier to debug)

Interview Patterns

Q: What is the difference between the four gRPC streaming patterns?

A: Unary (one request, one response) works like a normal function call. Server-streaming (one request, many responses) is like a server-push feed - used for batch results, live notifications. Client-streaming (many requests, one response) is like chunked upload - client sends data, server returns one confirmation. Bidirectional streaming (both sides stream simultaneously) is like a real-time duplex channel - used for interactive protocols like transcription, game state sync.

Q: Why does Protocol Buffers serialisation produce smaller payloads than JSON?

A: Three reasons. First, field names are not repeated - a field document_type in JSON takes 15 bytes as a string key in every message; in Protobuf it is encoded as a varint field number (typically 1 byte). Second, Protobuf uses efficient binary encoding for numbers (variable-length integers), while JSON encodes numbers as ASCII strings. Third, Protobuf has no whitespace. A typical message that is 500 bytes in JSON is 50–150 bytes in Protobuf.

Q: How do gRPC interceptors differ from FastAPI middleware?

A: Both are middleware, but scoped differently. FastAPI middleware operates at the HTTP request/response level - it sees raw HTTP headers, paths, status codes. gRPC interceptors operate at the RPC level - they see method names, protobuf messages, gRPC metadata, and StatusCode. You cannot write a gRPC interceptor in FastAPI middleware or vice versa. For a service that exposes both REST and gRPC, you need both.

Q: How do you handle gRPC error propagation across service boundaries?

A: Map gRPC StatusCode to HTTP status codes in gateway services using a lookup table (UNAVAILABLE → 503, INVALID_ARGUMENT → 400, etc.). For richer error details, use google.rpc.Status with details field containing packed BadRequest, QuotaFailure, or ErrorInfo messages. Always propagate correlation IDs in gRPC metadata so errors can be traced across services in Jaeger.

© 2026 EngineersOfAI. All rights reserved.